package io.sundial.coordination.curator;

import com.google.common.base.Preconditions;
import io.sundial.coordination.CallbackException;
import io.sundial.coordination.ConnStatus;
import io.sundial.coordination.Coordinator;
import io.sundial.coordination.CoordinatorException;
import io.sundial.coordination.node.Node;
import io.sundial.coordination.node.NodeEvent;
import io.sundial.coordination.node.NodeWatcher;
import io.sundial.coordination.tree.TreeEvent;
import io.sundial.coordination.tree.TreeWatcher;
import io.sundial.coordination.vote.VoteWatcher;
import io.sundial.coordination.watching.EventWatching;
import io.sundial.coordination.watching.StatedEventWatching;
import io.sundial.core.Callback;
import io.sundial.core.context.Context;
import io.sundial.core.lifecycle.Stateful;
import io.sundial.core.lifecycle.exception.DestroyingException;
import io.sundial.core.lifecycle.exception.InitializingException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.leader.CancelLeadershipException;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

/**
 * Curator 节点服务实现
 *
 * @author Payne 646742615@qq.com
 * 2018/12/21 10:12
 */
public class CuratorCoordinator extends Stateful implements Coordinator {
    private CuratorFramework curator;

    @Override
    protected void initializing(Context context) throws InitializingException {
        super.initializing(context);

        curator = curator != null
                ? curator
                : context.get(CuratorFramework.class, new CuratorSupplier());
        if (curator.getState() == CuratorFrameworkState.LATENT) {
            curator.start();
        }
    }

    @Override
    protected void destroying() throws DestroyingException {
        super.destroying();

        if (curator.getState() == CuratorFrameworkState.STARTED) {
            curator.close();
        }
        curator = null;
    }

    public String create(
            String path,
            byte[] data,
            boolean durable,
            boolean sequential
    ) throws CoordinatorException {
        try {
            // 创建模式
            CreateMode mode = durable
                    ? sequential ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.PERSISTENT
                    : sequential ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.EPHEMERAL;
            // 同步创建
            return curator.create()
                    .creatingParentsIfNeeded()
                    .withMode(mode)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(path, data);
        } catch (Exception e) {
            throw new CoordinatorException(e);
        }
    }

    @Override
    public void create(
            String path,
            byte[] data,
            boolean durable,
            boolean sequential,
            Callback<CreateResult, CallbackException> callback
    ) throws CoordinatorException {
        this.create(
                path,
                data,
                durable,
                sequential,
                callback,
                null
        );
    }

    @Override
    public void create(
            String path,
            byte[] data,
            boolean durable,
            boolean sequential,
            Callback<CreateResult, CallbackException> callback,
            Object context
    ) throws CoordinatorException {
        try {
            // 创建模式
            CreateMode mode = durable
                    ? sequential ? CreateMode.PERSISTENT_SEQUENTIAL : CreateMode.PERSISTENT
                    : sequential ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.EPHEMERAL;
            // 同步创建
            curator.create()
                    .creatingParentsIfNeeded()
                    .withMode(mode)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .inBackground(new CuratorCreateCallback(callback), context)
                    .forPath(path, data);
        } catch (Exception e) {
            throw new CoordinatorException(e);
        }
    }

    public Node obtain(String path) throws CoordinatorException {
        try {
            Stat stat = new Stat();
            byte[] data = curator.getData()
                    .storingStatIn(stat)
                    .forPath(path);
            return new CuratorNode(path, data, stat);
        } catch (Exception e) {
            throw new CoordinatorException(e);
        }
    }

    @Override
    public void obtain(String path, Callback<ObtainResult, CallbackException> callback) throws CoordinatorException {
        this.obtain(path, callback, null);
    }

    @Override
    public void obtain(String path, Callback<ObtainResult, CallbackException> callback, Object context) throws CoordinatorException {
        try {
            curator.getData()
                    .inBackground(new CuratorObtainCallback(callback), context)
                    .forPath(path);
        } catch (Exception e) {
            throw new CoordinatorException(e);
        }
    }

    @Override
    public Node update(String path, byte[] data, int version) throws CoordinatorException {
        try {
            Stat stat = curator.setData()
                    .withVersion(version)
                    .forPath(path, data);
            return new CuratorNode(path, data, stat);
        } catch (Exception e) {
            throw new CoordinatorException(e);
        }
    }

    @Override
    public void update(String path, byte[] data, int version, Callback<UpdateResult, CallbackException> callback) throws CoordinatorException {
        this.update(path, data, version, callback, null);
    }

    @Override
    public void update(String path, byte[] data, int version, Callback<UpdateResult, CallbackException> callback, Object context) throws CoordinatorException {
        try {
            curator.setData()
                    .withVersion(version)
                    .inBackground(new CuratorUpdateCallback(callback), context)
                    .forPath(path, data);
        } catch (Exception e) {
            throw new CoordinatorException(e);
        }
    }

    @Override
    public void delete(String path, int version, boolean guaranteed, boolean recursived) throws CoordinatorException {
        try {
            int flag = 0b00;
            if (guaranteed) flag |= 0b01;
            if (recursived) flag |= 0b10;
            switch (flag) {
                case 0b00:
                    curator.delete().withVersion(version).forPath(path);
                    break;
                case 0b01:
                    curator.delete().guaranteed().withVersion(version).forPath(path);
                    break;
                case 0b10:
                    curator.delete().deletingChildrenIfNeeded().withVersion(version).forPath(path);
                    break;
                case 0b11:
                    curator.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).forPath(path);
                    break;
            }
        } catch (Exception e) {
            throw new CoordinatorException(e);
        }
    }

    @Override
    public void delete(String path, int version, boolean guaranteed, boolean recursived, Callback<DeleteResult, CallbackException> callback) throws CoordinatorException {
        delete(path, version, guaranteed, recursived, callback, null);
    }

    @Override
    public void delete(String path, int version, boolean guaranteed, boolean recursived, Callback<DeleteResult, CallbackException> callback, Object context) throws CoordinatorException {
        try {
            int flag = 0b00;
            if (guaranteed) flag |= 0b01;
            if (recursived) flag |= 0b10;
            switch (flag) {
                case 0b00:
                    curator.delete().withVersion(version).inBackground(new CuratorDeleteCallback(callback), context).forPath(path);
                    break;
                case 0b01:
                    curator.delete().guaranteed().withVersion(version).inBackground(new CuratorDeleteCallback(callback), context).forPath(path);
                    break;
                case 0b10:
                    curator.delete().deletingChildrenIfNeeded().withVersion(version).inBackground(new CuratorDeleteCallback(callback), context).forPath(path);
                    break;
                case 0b11:
                    curator.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).inBackground(new CuratorDeleteCallback(callback), context).forPath(path);
                    break;
            }
        } catch (Exception e) {
            throw new CoordinatorException(e);
        }
    }

    @Override
    public EventWatching elect(String path, VoteWatcher watcher) throws CoordinatorException {
        return elect(path, watcher, true);
    }

    @Override
    public EventWatching elect(
            String path,
            VoteWatcher watcher,
            boolean now
    ) throws CoordinatorException {
        EventWatching watching = new CuratorVoteWatching(path, watcher);
        if (now) watching.start();
        return watching;
    }

    @Override
    public EventWatching watch(String path, NodeWatcher watcher) throws CoordinatorException {
        return watch(path, watcher, true);
    }

    @Override
    public EventWatching watch(
            String path,
            NodeWatcher watcher,
            boolean now
    ) throws CoordinatorException {
        EventWatching watching = new CuratorNodeWatching(path, watcher);
        if (now) watching.start();
        return watching;
    }

    @Override
    public EventWatching watch(String root, TreeWatcher watcher) throws CoordinatorException {
        return watch(root, watcher, true);
    }

    @Override
    public EventWatching watch(
            String root,
            TreeWatcher watcher,
            boolean now
    ) throws CoordinatorException {
        EventWatching watching = new CuratorTreeWatching(root, watcher);
        if (now) watching.start();
        return watching;
    }

    /**
     * Curator 选举监听器
     */
    private class CuratorVoteListener implements LeaderSelectorListener {
        private final VoteWatcher watcher;

        CuratorVoteListener(VoteWatcher watcher) {
            if (watcher == null) {
                throw new IllegalArgumentException("watcher must not be null");
            }
            this.watcher = watcher;
        }

        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            watcher.onMyselfElected(CuratorCoordinator.this);
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState state) {
            ConnStatus connStatus;
            switch (state) {
                case CONNECTED:
                    connStatus = ConnStatus.CONNECTED;
                    break;
                case SUSPENDED:
                    connStatus = ConnStatus.SUSPENDED;
                    break;
                case RECONNECTED:
                    connStatus = ConnStatus.RECONNECTED;
                    break;
                case LOST:
                    connStatus = ConnStatus.LOST;
                    break;
                case READ_ONLY:
                    connStatus = ConnStatus.READONLY;
                    break;
                default:
                    throw new CancelLeadershipException("unknown connection state: " + state);
            }
            boolean keep;
            try {
                keep = watcher.onStatusChanged(CuratorCoordinator.this, connStatus);
            } catch (Exception e) {
                throw new CancelLeadershipException(e);
            }
            if (!keep) {
                throw new CancelLeadershipException();
            }
        }
    }

    private class CuratorNodeListener implements NodeCacheListener {
        private final String path;
        private final NodeWatcher watcher;
        private final CachedNode node;

        CuratorNodeListener(String path, NodeCache cache, NodeWatcher watcher) {
            Preconditions.checkNotNull(path, "path must not be null");
            Preconditions.checkNotNull(cache, "cache must not be null");
            Preconditions.checkNotNull(watcher, "watcher must not be null");

            this.path = path;
            this.watcher = watcher;
            this.node = new CachedNode(path, cache);
        }

        @Override
        public void nodeChanged() throws Exception {
            NodeEvent nodeEvent = new NodeEvent(path, node);
            watcher.onWatched(nodeEvent);
        }
    }

    private class CuratorTreeListener implements TreeCacheListener {
        private final String root;
        private final TreeWatcher watcher;
        private final CachedTree tree;

        CuratorTreeListener(String root, TreeCache cache, TreeWatcher watcher) {
            Preconditions.checkNotNull(root, "root must not be null");
            Preconditions.checkNotNull(cache, "cache must not be null");
            Preconditions.checkNotNull(watcher, "watcher must not be null");

            this.root = root;
            this.watcher = watcher;
            this.tree = new CachedTree(root, cache);
        }

        @Override
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            ChildData data = event.getData();
            Node node = data != null ? new CachedTree.Child(data.getPath(), data) : null;
            TreeEvent.Type type;
            switch (event.getType()) {
                case NODE_ADDED:
                    type = TreeEvent.Type.NODE_CREATED;
                    break;
                case NODE_UPDATED:
                    type = TreeEvent.Type.NODE_UPDATED;
                    break;
                case NODE_REMOVED:
                    type = TreeEvent.Type.NODE_REMOVED;
                    break;
                case CONNECTION_SUSPENDED:
                    type = TreeEvent.Type.CONN_SUSPENDED;
                    break;
                case CONNECTION_RECONNECTED:
                    type = TreeEvent.Type.CONN_RECONNECTED;
                    break;
                case CONNECTION_LOST:
                    type = TreeEvent.Type.CONN_LOST;
                    break;
                case INITIALIZED:
                    type = TreeEvent.Type.TREE_INITIALIZED;
                    break;
                default:
                    throw new IllegalStateException("unknown event type: " + event.getType());
            }
            TreeEvent treeEvent = new TreeEvent(root, type, tree, node);
            watcher.onWatched(treeEvent);
        }
    }

    private class CuratorVoteWatching extends StatedEventWatching implements EventWatching {
        private final String path;
        private final VoteWatcher watcher;
        private volatile LeaderSelector selector;

        CuratorVoteWatching(String path, VoteWatcher watcher) {
            this.path = path;
            this.watcher = watcher;
        }

        @Override
        protected void onStart() {
            selector = new LeaderSelector(
                    curator,
                    path,
                    new CuratorVoteListener(watcher)
            );
            selector.autoRequeue();
            selector.start();
        }

        @Override
        protected void onPause() {
            selector.close();
        }

        @Override
        protected void onClose() {
            selector.close();
        }
    }

    private class CuratorNodeWatching extends StatedEventWatching implements EventWatching {
        private final String path;
        private final NodeWatcher watcher;
        private volatile NodeCache cache;

        CuratorNodeWatching(String path, NodeWatcher watcher) {
            this.path = path;
            this.watcher = watcher;
        }

        @Override
        protected void onStart() throws CoordinatorException {
            try {
                cache = new NodeCache(curator, path);
                cache.getListenable().addListener(new CuratorNodeListener(path, cache, watcher));
                cache.start();
            } catch (Exception e) {
                throw new CoordinatorException(e);
            }
        }

        @Override
        protected void onPause() throws CoordinatorException {
            try {
                cache.close();
            } catch (Exception e) {
                throw new CoordinatorException(e);
            }
        }

        @Override
        protected void onClose() throws CoordinatorException {
            try {
                cache.close();
            } catch (Exception e) {
                throw new CoordinatorException(e);
            }
        }
    }

    private class CuratorTreeWatching extends StatedEventWatching implements EventWatching {
        private final String path;
        private final TreeWatcher watcher;
        private volatile TreeCache cache;

        CuratorTreeWatching(String path, TreeWatcher watcher) {
            this.path = path;
            this.watcher = watcher;
        }

        @Override
        protected void onStart() throws CoordinatorException {
            try {
                cache = new TreeCache(curator, path);
                cache.getListenable().addListener(new CuratorTreeListener(path, cache, watcher));
                cache.start();
            } catch (Exception e) {
                throw new CoordinatorException(e);
            }
        }

        @Override
        protected void onPause() throws CoordinatorException {
            try {
                cache.close();
            } catch (Exception e) {
                throw new CoordinatorException(e);
            }
        }

        @Override
        protected void onClose() throws CoordinatorException {
            try {
                if (cache != null) {
                    cache.close();
                }
            } catch (Exception e) {
                throw new CoordinatorException(e);
            }
        }
    }

    private class CuratorCreateCallback implements BackgroundCallback {
        private final Callback<CreateResult, CallbackException> callback;

        CuratorCreateCallback(Callback<CreateResult, CallbackException> callback) {
            this.callback = callback;
        }

        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            if (callback == null) {
                return;
            }

            int rc = event.getResultCode();
            if (rc != KeeperException.Code.OK.intValue()) {
                callback.call(
                        false,
                        null,
                        new CallbackException(rc, event.getContext())
                );
                return;
            }

            String path = event.getPath();
            String name = event.getName();
            byte[] data = event.getData();
            Object context = event.getContext();
            CreateResult result = new CreateResult(
                    path,
                    name,
                    data,
                    context
            );

            callback.call(true, result, null);
        }
    }

    private class CuratorObtainCallback implements BackgroundCallback {
        private final Callback<ObtainResult, CallbackException> callback;

        CuratorObtainCallback(Callback<ObtainResult, CallbackException> callback) {
            this.callback = callback;
        }

        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            if (callback == null) {
                return;
            }

            int rc = event.getResultCode();
            if (rc != KeeperException.Code.OK.intValue()) {
                callback.call(
                        false,
                        null,
                        new CallbackException(rc, event.getContext())
                );
                return;
            }

            String path = event.getPath();
            byte[] data = event.getData();
            Stat stat = event.getStat();
            Object context = event.getContext();
            Node node = new CuratorNode(path, data, stat);
            ObtainResult result = new ObtainResult(node, context);

            callback.call(true, result, null);
        }
    }

    private class CuratorUpdateCallback implements BackgroundCallback {
        private final Callback<UpdateResult, CallbackException> callback;

        CuratorUpdateCallback(Callback<UpdateResult, CallbackException> callback) {
            this.callback = callback;
        }

        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            if (callback == null) {
                return;
            }

            int rc = event.getResultCode();
            if (rc != KeeperException.Code.OK.intValue()) {
                callback.call(
                        false,
                        null,
                        new CallbackException(rc, event.getContext())
                );
                return;
            }

            String path = event.getPath();
            Object context = event.getContext();
            UpdateResult result = new UpdateResult(path, context);

            callback.call(true, result, null);
        }
    }

    private class CuratorDeleteCallback implements BackgroundCallback {
        private final Callback<DeleteResult, CallbackException> callback;

        CuratorDeleteCallback(Callback<DeleteResult, CallbackException> callback) {
            this.callback = callback;
        }

        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            if (callback == null) {
                return;
            }

            int rc = event.getResultCode();
            if (rc != KeeperException.Code.OK.intValue()) {
                callback.call(
                        false,
                        null,
                        new CallbackException(rc, event.getContext())
                );
                return;
            }

            String path = event.getPath();
            Object context = event.getContext();
            DeleteResult result = new DeleteResult(path, context);

            callback.call(true, result, null);
        }
    }

    public CuratorFramework getCurator() {
        return curator;
    }

    public void setCurator(CuratorFramework curator) {
        this.curator = curator;
    }
}
